const EventEmitter = require("events").EventEmitter;
const fs = require("fs");
class ReadStream extends EventEmitter {
  constructor(path, options) {
    super();
    this.path = path;
    this.flags = options.flags || "r";
    this.encoding = options.encoding || null;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true;
    this.start = options.start || 0;
    this.end = options.end;
    this.endFlag = false;

    this.flowing = null; //默认是暂停模式
    this.offset = 0;
    this.open();
    this.on("newListener", (type) => {
      if (type === "data") {
        this.flowing = true;
        this.read();
      }
    });
  }
  read() {
    if (typeof this.fd !== "number") {
      return this.once("open", this.read);
    }
    // console.log(this.fd);

    let howMuchToRead = this.end
      ? Math.min(this.highWaterMark, this.end - this.start + 1 - this.offset)
      : this.highWaterMark;
    let buffer = Buffer.alloc(howMuchToRead);
    fs.read(
      this.fd,
      buffer,
      0,
      howMuchToRead, //this.highWaterMark,
      this.offset,
      (err, bytesRead) => {
        this.offset += bytesRead;
        if (bytesRead > 0) {
          this.emit("data", buffer.slice(0, bytesRead));
          if (this.flowing) {
            this.read();
          }
        } else {
          this.endFlag = true;
          this.emit("end");
          this.close();
        }
      }
    );
  }
  open() {
    fs.open(this.path, this.flags, (err, fd) => {
      this.fd = fd;
      this.emit("open", this.fd);
    });
  }
  close() {
    if (this.autoClose) {
      fs.close(this.fd, () => {
        this.emit("close");
      });
    }
  }
  pause() {
    this.flowing = false;
  }
  resume() {
    this.flowing = true;
    if (!this.endFlag) {
      this.read();
    }
  }
  pipe(dest) {
    this.on("data", (data) => {
      // console.log(data);
      let flag = dest.write(data);
      if (!flag) {
        this.pause();
      }
    });

    dest.on("drain", () => {
      this.resume();
    });
    this.on("end", () => {
      dest.end();
    });
  }
}
module.exports = ReadStream;
